diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py
index 2336883d..aa40b69b 100644
--- a/xarray/core/dataset.py
+++ b/xarray/core/dataset.py
@@ -73,7 +73,7 @@ from xarray.core.merge import (
 )
 from xarray.core.missing import get_clean_interp_index
 from xarray.core.options import OPTIONS, _get_keep_attrs
-from xarray.core.pycompat import array_type, is_duck_array, is_duck_dask_array
+from xarray.core.parallel_computation_interface import ParallelComputationInterface
 from xarray.core.types import QuantileMethods, T_Dataset
 from xarray.core.utils import (
     Default,
@@ -741,25 +741,40 @@ class Dataset(
         --------
         dask.compute
         """
-        # access .data to coerce everything to numpy or dask arrays
-        lazy_data = {
-            k: v._data for k, v in self.variables.items() if is_duck_dask_array(v._data)
-        }
-        if lazy_data:
-            import dask.array as da
+        def compute(self, **kwargs):
+            """Manually trigger loading of this dataset's data from disk or a remote source into memory and return a new dataset. The original is left unaltered.
 
-            # evaluate all the dask arrays simultaneously
-            evaluated_data = da.compute(*lazy_data.values(), **kwargs)
+            This is particularly useful when working with many file objects on disk.
 
-            for k, data in zip(lazy_data, evaluated_data):
-                self.variables[k].data = data
+            Parameters
+            ----------
+            **kwargs : dict
+                Additional keyword arguments passed on to the computation interface's compute method.
 
-        # load everything else sequentially
-        for k, v in self.variables.items():
-            if k not in lazy_data:
-                v.load()
+            See Also
+            --------
+            ParallelComputationInterface.compute
+            """
+            # access .data to coerce everything to numpy or computation interface arrays
+            lazy_data = {
+                k: v._data for k, v in self.variables.items() if is_duck_dask_array(v._data)
+            }
+            if lazy_data:
+                # Create an instance of the computation interface
+                computation_interface = ParallelComputationInterface()
 
-        return self
+                # evaluate all the computation interface arrays simultaneously
+                evaluated_data = computation_interface.compute(*lazy_data.values(), **kwargs)
+
+                for k, data in zip(lazy_data, evaluated_data):
+                    self.variables[k].data = data
+
+            # load everything else sequentially
+            for k, v in self.variables.items():
+                if k not in lazy_data:
+                    v.load()
+
+            return self
 
     def __dask_tokenize__(self):
         from dask.base import normalize_token
@@ -806,15 +821,15 @@ class Dataset(
 
     @property
     def __dask_optimize__(self):
-        import dask.array as da
-
-        return da.Array.__dask_optimize__
+        return self._parallel_computation_interface.get_optimize_function()
 
     @property
     def __dask_scheduler__(self):
-        import dask.array as da
+        return self._parallel_computation_interface.get_scheduler()
 
-        return da.Array.__dask_scheduler__
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._parallel_computation_interface = ParallelComputationInterface()
 
     def __dask_postcompute__(self):
         return self._dask_postcompute, ()
@@ -2227,11 +2242,11 @@ class Dataset(
         token : str, optional
             Token uniquely identifying this dataset.
         lock : bool, default: False
-            Passed on to :py:func:`dask.array.from_array`, if the array is not
-            already as dask array.
+            If the array is not already as dask array, this will be passed on to the
+            computation interface.
         inline_array: bool, default: False
-            Passed on to :py:func:`dask.array.from_array`, if the array is not
-            already as dask array.
+            If the array is not already as dask array, this will be passed on to the
+            computation interface.
         **chunks_kwargs : {dim: chunks, ...}, optional
             The keyword arguments form of ``chunks``.
             One of chunks or chunks_kwargs must be provided
@@ -2245,7 +2260,6 @@ class Dataset(
         Dataset.chunks
         Dataset.chunksizes
         xarray.unify_chunks
-        dask.array.from_array
         """
         if chunks is None and chunks_kwargs is None:
             warnings.warn(
@@ -2266,8 +2280,12 @@ class Dataset(
                 f"some chunks keys are not dimensions on this object: {bad_dims}"
             )
 
+        # Create an instance of the DaskComputationInterface
+        dask_interface = DaskComputationInterface()
+
         variables = {
-            k: _maybe_chunk(k, v, chunks, token, lock, name_prefix)
+            k: dask_interface.array_from_template(v, chunks, name_prefix=name_prefix, lock=lock, inline_array=inline_array)
+            if not is_duck_dask_array(v._data) else v._data.rechunk(chunks)
             for k, v in self.variables.items()
         }
         return self._replace(variables)
@@ -6394,8 +6412,7 @@ class Dataset(
         dask.dataframe.DataFrame
         """
 
-        import dask.array as da
-        import dask.dataframe as dd
+        from xarray.core.parallel_computation_interface import ParallelComputationInterface
 
         ordered_dims = self._normalize_dim_order(dim_order=dim_order)
 
